package com.nstskj.study.netty.tcp.netty.protocol.executor.work;

import com.nstskj.study.netty.tcp.netty.protocol.session.base.AbstractSession;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author ZhouChuGang
 * @version 1.0
 * @project nstskj-study-netty-tcp-spring
 * @date 2021/5/4 11:46
 * @Description 抽象的 tcp 协议工作者
 */
@Slf4j
public abstract class AbstractTcpProtocolWork<T> implements Runnable {

    /**
     * 堵塞队列
     */
    private final BlockingQueue<T> blockingQueue;

    /**
     * tcp会话 session
     */
    private final AbstractSession abstractSession;

    /**
     * 运行标记
     */
    private final AtomicBoolean runFlag = new AtomicBoolean(false);

    /**
     * 当前执行的线程
     */
    private volatile Thread thread;

    /**
     * 工作线程名字，用于打印
     */
    private final String workName;

    /**
     * 构造 默认不执行
     *
     * @param blockingQueue   堵塞队列
     * @param abstractSession session
     * @param workName        工作线程名字，用于打印
     */
    protected AbstractTcpProtocolWork(BlockingQueue<T> blockingQueue, AbstractSession abstractSession, String workName) {
        this.blockingQueue = blockingQueue;
        this.abstractSession = abstractSession;
        this.workName = workName;
    }


    /**
     * 收到堵塞队列的数据后的处理
     *
     * @param t               队列中的元素
     * @param abstractSession 当前处理的会话
     * @throws Exception
     */
    public abstract void doHandlerWork(T t, AbstractSession abstractSession) throws Exception;

    @Override
    public void run() {
        this.thread = Thread.currentThread();
        //标记运行
        this.runFlag.compareAndSet(false, true);
        log.info("开启工作线程 name {} sessionId {}", this.workName, this.abstractSession.getSessionId());
        while (this.runFlag.get()) {
            try {
                T msgData = this.blockingQueue.take();
                doHandlerWork(msgData, this.abstractSession);
            } catch (Exception e) {
                //这里忽略异常
            }
        }
        log.info("!关闭工作线程 name {} sessionId {}", this.workName, this.abstractSession.getSessionId());
    }

    public String getWorkName() {
        return workName;
    }

    public BlockingQueue<T> getBlockingQueue() {
        return blockingQueue;
    }

    public AbstractSession getAbstractSession() {
        return abstractSession;
    }

    public boolean isRun() {
        return runFlag.get();
    }

    /**
     * 停止
     */
    public void stop() {
        if (this.runFlag.compareAndSet(true, false)) {
            if (this.thread != null) {
                //中断唤醒
                this.thread.interrupt();
            }
        }
    }

}
